iT邦幫忙

2024 iThome 鐵人賽

DAY 29
1
DevOps

全端監控技術筆記---從Sentry到Opentelemetry系列 第 29

Day29--關於 Opentelemetry 中的 Batch Processor

  • 分享至 

  • xImage
  •  

前言

(到了29篇,突然不知道要寫什麼了...想到我自己之前對 batch process 的筆記也可以來改寫成一篇哈...)

透過前面一系列的 demo,我們知道Opentelemetry 這個開源標準主要是採集 Trace、Metric、Log 三種類型的遙測資料,並且大致了解它們的採集邏輯、並手寫實現了相應的 mini-sdk。此外,我們同時也 demo 了有關 Sentry 在錯誤捕捉、前端效能追蹤、全鏈路請求監控等,以及也手寫 demo 了相關邏輯。

然而在上述的情境中,其實很容易在很短的時間內發起許多請求、把相關數據傳送到指定服務。這麼一來,無論是在前端還是後端,在處理自身業務邏輯之外,還需要負擔觀測數據的請求傳送,可能會造成網路延遲、或資源被佔用。

這時,我們可以看到在 SDK 中,有 BatchProcessor這個處理方式,透過批量處理請求來優化這個問題。

主要邏輯

批量處理請求,代表一次處理一連串的請求。首先我們會將請求放在一個 Buffer 中(通常使用 queue ),等到某些條件達成的時候再一次處理。

而這個任務積壓也不能太多,不然會讓程式一瞬間要處理太多任務。通常會設一個 queue 最大值,如果積壓任務的數量大過最大值時,就要立即處理、或者丟棄新任務。

另外,如果請求一直沒有辦法達到最大值的話,是不是就沒辦法觸發處理請求?這樣的結果也是沒辦法接受的,所以往往還需要另外設定一個定時器,來定時處理在 queue 中積壓的請求。

因此,通常主要有兩個觸發批量處理的時刻:

  1. 請求積壓的數量超過 queue 容量最大值時,立即處理queue並且清空、讓新的任務進入進入 buffer queue
  2. 在一定的時間內要把 queue 處理並清空

而在 Opentelemetry-js 中的 BatchSpanProcessorBaseBatchLogRecordProcessorBase,有定義自己的觸發邏輯:當有 task (span / log)進入 buffer queue 的時候,就會考慮要不要 export:

  • 假如 buffer 裡的 task 數量超過設定好的每次批處理的最大數量,就立即flush
  • 如果沒有超過,就使用 setTimeout來異步 flush。
    • 在某些情況下,數據的生成速度可能較慢,未能快速達到批次導出的數量。如果我們每次只因為有少量數據就立刻導出,可能會造成頻繁的網絡請求或處理,增加網絡和系統的開銷。
    • 設置延遲允許更多的數據在這段時間內進入緩衝區,這樣能夠一次導出更多的數據,從而提高傳輸效率,減少資源消耗。
  • 如果 buffer 內的 task 數量到達最大值,Opentelemetetry是定義要 丟棄任務

實現一個簡單的 BatchProcessor

主要有四個要設定的常量

  • maxExportBatchSize
    • 定義每次批次處理的最大數量
    • demo中預設為 5
  • maxQueueSize
    • 定義了可以緩存的最大數量(queue 的容量大小)
    • demo中預設為 20
  • scheduledDelayMillis
    • 批次導出的timer 時間間隔
    • demo中預設為 1 秒
  • exportTimeoutMillis
    • 單次批次處理的超時時間,防止長時間處理影響應用性能。
    • demo中預設為 5 秒

然後加上 Buffer 以及一些 flag, 該 BatchProcessor 構造器為

class BatchProcessor {
    constructor(exporter) {
        this._exporter = exporter;
        this._maxExportBatchSize = 5;
        this._maxQueueSize = 10;
        this._scheduledDelayMillis = 1000; //ms
        this._exportTimeoutMillis = 5000; //ms

        this._buffer = [];
        this._timer = undefined;
        this._isExporting = false; //避免重複導出
    }
}

batch邏輯--- flushOneBatch

  • 定義超時的 timer:
const timer = setTimeout(() => {
    reject(new Error('Timeout'));
}, this._exportTimeoutMillis);
  • 查看 Buffer 內的任務數量有沒有超過maxExportBatchSize , 然後給 exporter 處理
let taskItems;
if (this._buffer.length <= this._maxExportBatchSize) {
    taskItems = this._buffer;
    this._buffer = [];
} else {
    taskItems = this._buffer.splice(0, this._maxExportBatchSize);
}

this._exporter(taskItems, () => {
    clearTimeout(timer);
    resolve();
});

flush 邏輯--- maybeStartTimer

  • 先定義 flush 函數,如果執行完 batch 後,Buffer裡面還有任務,那麼就再執行一次:
const flush = () => {
    ...
    this._flushOneBatch().finally(() => {
        ...
        if (this._buffer.length > 0) {
            ...
            this._maybeStartTimer();
        }
    });
};
  • 再來就是觸發 flush 的條件:
...
// 如果 buffer 內大於單次批次大小,立即flush---加速
if (this._buffer.length >= this._maxExportBatchSize) {
    return flush();
}
if (this._timer !== undefined) return;

// 允許延遲,讓更多任務進入 buffer;
this._timer = setTimeout(() => flush(), this._scheduledDelayMillis);

測試

我們先 mock 一個 exporter,它的作用就是打印所有的 task 名稱:

const mockExporter = (tasks, callback) => {
    Promise.all(
        tasks.map((task) => {
            return new Promise((resolve) => {
                console.log(task);
                resolve();
            });
        }),
    ).finally(() => {
        console.log('---finish export!!');
        callback();
    });
};

然後生成 30 個任務,為每個任務生成一個隨機的延遲時間,以模擬不同時間點的任務進入 BatchProcessor。

mockTasks.forEach((task, index) => {
    setTimeout(() => {
        console.log(`Pushing ${task} into the batch processor`);
        batchProcessor.addToBuffer(task);
    }, randomDelay() * index); // 隨機延遲推送任務到 processor
});

結果如下:

image

我們可以看到,輸出結果跟我們設定的一樣,任務為5個一個數量單位輸出,實現了簡易的批量處理!

小結

在本文中,我們依照 opentelemetry-js 中的 batch process 邏輯,實現了邏輯相近的批量處理。不過這個處理方式是在 SDK 中減少請求的次數,進而減緩 collector、處理遙測數據 server 的負擔。

但實際上,如果數據量上來,還是需要其他的優化手段如:

  • 將多個 Collector 節點組合,分散資料的處理負荷。
  • 使用 Kafka 等事件流處理系統,對資料進行即時的緩衝與處理。
  • ...

所以除了在 SDK 中使用批量處理,還需要視實際業務情況來增加額外的優化手段。

本文程式碼可以在此 Github repository 中查看。

ref

ChangeLog

  • 20241013--修改前文,補上小結
  • 20241002--初稿

上一篇
Day28--服務端/客戶端渲染與opentelememtry--以NextJS為例
下一篇
Day30--結語與完賽心得
系列文
全端監控技術筆記---從Sentry到Opentelemetry30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言